import akka.actor.{Props, ActorRef, ActorSystem}
import dao.RedisDao
import extension.Extension
import model.{ActorSystemHelper, InserRedisActor}
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd.RDD
import util.{GlobalArgs, ScopeCaculateUtil}

/**
  * Created by chenjianwen on 2016/3/2.
  */
class YCFActionDataHandler(val path:String) extends Extension{
    def ratingRdd():RDD[(Int,Rating)]={
      //val actorRef = InserRedisActor.getActorRef
      val tempIdDevMapArray = CFContext.sc.textFile(GlobalArgs.act_path).map(x=>{
        x.split(",") match {
          case Array(actType,devId,userId,products,timestamp)=>{
            var tempUserId:Int =  0
            var tempDev:String="**"
            if("0".equals(userId)){
              tempUserId = devId.hashCode
              tempDev = devId
            }else{
              tempUserId = -1
            }
            (tempUserId,tempDev)
          }
        }
      }).filter(x=>(!x._2.equals("**"))).groupBy(x=>x._1).map(x=>(x._1,x._2.iterator.next()._2)).collect()

      (new Thread(()=>{insertDevIdToRedis(tempIdDevMapArray)})).start()

      val res = CFContext.sc.textFile(GlobalArgs.act_path).map(x=>{
        var res:List[((Int,Int),(Int,Long))] = List()
          x.split(",") match {
              case Array(actType,devId,userId,products,timestamp)=>{
                var tempUserId:Int =  0
                if("0".equals(userId)){
                  tempUserId = devId.hashCode
                }else{
                  tempUserId = userId.toInt
                }
                //有可能一个行为数据对应多个订单
                val productss = products.split("#")
                for(one <- productss){
                  res = res :+((tempUserId,one.toInt),(actType.toInt,timestamp.toLong))
                  //println(res.size)
                }
              }
          }
        res
      }).flatMap(
        x=>x
      ).groupByKey()
        .map(pro=>{
         ((pro._2.iterator.next()._2%10).toInt,new Rating(pro._1._1,pro._1._2,ScopeCaculateUtil.actions2Socrecompute(pro._2)))
      })
      res
    }

  def insertDevIdToRedis(map:Array[(Int,String)]): Unit ={
    for(one <- map){
      //写入redis，记录访客用户的设备号
      val redis = RedisDao.getJedis
      redis.set(s"visitor:${one._1}", one._2 + "#")
      redis.close()
    }
  }


}

